BigQuery で Airflow DAG のオーケストレーションができるようになりました(Preview)
こんにちは!エノカワです。
BigQuery で Airflow DAG をオーケストレーションする機能が プレビュー となりました。
この機能により、BigQuery で Airflow DAG を表示、トリガー、一時停止することが可能になりました。
You can now view, trigger, and pause Airflow DAGs in BigQuery.
今回は、この新機能を実際に試してみましたので、その内容をご紹介します。
環境作成
まずは、Cloud Composer 3 環境を作成します。
Google Cloud コンソールで Cloud Composer の ページに移動します。
[環境の作成] をクリックし、表示されるバージョンリストから [Composer 3] を選択します。
test-composer3
という名前で、東京リージョン、最新のイメージバージョンを選択し、サービスアカウントなど他はデフォルトのままで作成します。
DAG を作成する
次に、Cloud Composer 環境で実行するDAGを作成します。
今回は、Google Cloud Storage (GCS) にあるファイルを BigQuery に取り込むための Airflow DAG を作成しました。
from airflow import DAG
from airflow.utils.dates import days_ago
from airflow.providers.google.cloud.transfers.gcs_to_bigquery import GCSToBigQueryOperator
PROJECT_ID = '{プロジェクトID}'
DATASET_ID = 'work'
TABLE_ID = 'sales_data'
BUCKET_NAME = 'cm_enokawa_work'
# DAGの基本設定
default_args = {
'start_date': days_ago(1),
}
with DAG(
dag_id='gcs_to_bigquery',
default_args=default_args,
schedule_interval=None, # 手動で実行する
catchup=False
) as dag:
# GCS から BigQuery へのデータ取り込み
gcs_to_bq = GCSToBigQueryOperator(
task_id='load_gcs_to_bq',
bucket=BUCKET_NAME, # GCS バケット名
source_objects=['sales.csv'], # GCS 内のファイルパス
destination_project_dataset_table=f'{PROJECT_ID}.{DATASET_ID}.{TABLE_ID}', # BigQuery のデータセットとテーブル
source_format='CSV', # ソースファイル形式
write_disposition='WRITE_TRUNCATE', # 既存データの上書き
skip_leading_rows=1 # CSV のヘッダー行をスキップ
)
gcs_to_bq
この DAG では、Google Cloud Storage に保存された CSV ファイル (sales.csv
) を BigQuery のテーブル(sales_data
)にインポートしています。
この DAG を Cloud Composer にデプロイし、Airflow UI で DAG gcs_to_bigquery
が表示されていることを確認します。
DAG を表示する
BigQuery コンソールで、オーケストレーションページに移動し、DAG 表示を確認してみましょう。
先ほどデプロイした DAG gcs_to_bigquery
だけではなく、liveness DAG airflow_monitoring
も表示されていました。
BigQuery に関連する DAG だけではなく、Cloud Composer にデプロイされている DAG すべてが表示対象のようです。
列の表示オプション
列の表示オプションをクリックして、DAG実行の詳細を含む追加の列を表示することができます。
以下の項目が選択可能です。
- 状態
- 名前
- 種類
- ソース
- 送信先
- トリガー
- 作成日時(UTC+9)
- 最終更新(UTC+9)
- 最後に失敗した実行(UTC+9)
- オーナー
- リージョン
- 前回の実行(UTC+9)
フィルタ
検索窓からキーワードで DAG をフィルタリングすることが可能です。
また、画面左側のフィルタパネルの表示対象をフィルタリングすることが可能です。
Composer DAG 以外にも以下の種類が選択可能です。
- ノートブック
- データブック
- データ準備(Data Preparation)
- Dataform
- ワークフロー
また、種類以外にも下記のフィルタ項目があります。
- 州(※)
- 前回の実行ステータス
- 前回の実行時間
- リージョン
※:State(状態)の誤訳と思われます
Cloud Composer コンソールの DAG 表示
ちなみに、Cloud Composer コンソールでも [DAG] タブから DAG 表示することが可能です。
BigQuery オーケストレーションの DAG 表示とは項目が異なっていました。
DAG を実行する
選択した DAG を含む行で、縦三点リーダーから [DAG をトリガー] をクリックすると、DAG を手動でトリガーすることができます。
DAG を手動でトリガーすると、DAG に指定されたスケジュールとは関係なく、DAG を 1 回実行します。
Airflow UI の DAG 詳細画面でトリガー実行されていることが確認できました。
DAG の詳細を表示する
DAG の名前をクリックすると、DAG の詳細を表示することができます。
過去の DAG 実行
[実行] タブでは、過去の DAG 実行が表示されます。
デフォルトで過去 10 日間の DAG 実行が表示されますが、[10日] のドロップダウン メニューで時間範囲を変更することが可能です。
DAG の視覚化
[図] タブでは、タスクの依存関係を含む DAG の視覚化が表示されます。
図上のタスクを選択することで、タスクの詳細を表示することが可能です。
DAG のソース コード
[コード] タブでは、DAG のソース コードが表示されます。
DAG の詳細
[詳細] タブでは、DAG の詳細情報が表示されます。
DAG を一時停止する
選択した DAG を含む行で、縦三点リーダーから [DAG の一時停止] をクリックすると、DAG を一時停止することができます。
一時停止した DAG を含む行で、縦三点リーダーから [DAG の一時停止を解除] をクリックすると、DAG をアクティブにすることができます。
まとめ
以上、BigQuery で Airflow DAG をオーケストレーションする機能のご紹介でした。
BigQuery 上で直接 Airflow DAG の表示や操作ができるので、Cloud Composer に切り替えて操作する手間が減り、データパイプラインの運用が効率的になると感じました。
現在はプレビュー版ですが、今後の正式リリースに向けて、オーケストレーションページでの機能がさらに充実することを期待しています!